package worker

import (
	"context"
	"fmt"
	rpcXServer "github.com/smallnest/rpcx/server"
	"gogame/game/manage/config"
	"gogame/game/manage/model"
	"gogame/gameservice"
	"gogame/gameservice/service"
	"gogame/lib"
	"gogame/logger"
	"gogame/pb"
	"gogame/server/rpcx"
	"gogame/server/rpcx/param"
	"gogame/server/rpcx/server"
	"net"
	"strings"
	"sync"
)

type Service struct {
	service.Service
	ApiHandle                             // 接口处理
	RpcXServerListener *server.RpcXServer // rpcXServer

	tagRWMutex sync.RWMutex        // 别名读写锁
	Tag2Conn   map[string]net.Conn // 别名对应的rpcXClient服务对象
}

// NewService 创建worker service
func NewService() service.IService {
	return &Service{
		ApiHandle: ApiHandle{},
		Tag2Conn:  make(map[string]net.Conn),
	}
}

func init() {
	gameservice.Service2NewFunc["worker"] = NewService
}

// GetServiceByRand2Worker 随机获取一个worker服务
func GetServiceByRand2Worker() *Service {
	_service := gameservice.GetServiceByRand2Worker()
	if _service != nil {
		_returnService, _ := _service.(*Service)
		return _returnService
	}
	return nil
}

// StartServer 开始服务
func (s *Service) StartServer(serviceName, listen string) {
	// 初始化日志
	logger.InitLog("worker", serviceName)
	// rpcx.Server服务开启
	s.RpcXServerListener = server.NewRpcXServer(listen, serviceName)
	s.RpcXServerListener.RouteFunc = s.InitRoute
	s.RpcXServerListener.HandleConnCloseFunc = s.HandleConnClose
	go s.RpcXServerListener.Listen("tcp", listen)

}

// StopServer 停止服务
func (s *Service) StopServer() {
	s.RpcXServerListener.Close()

	// modelLog停止写入
	model.StopWriteModelLog()
}

// HandleNotice 处理服务通知
func (s *Service) HandleNotice(data []any) {
}

// InitRoute 路由
func (s *Service) InitRoute() {
	s.RpcXServerListener.RpcServer.RegisterName(
		rpcx.GetDefaultServiceName(), // 服务key
		s,                            // 服务载体
		fmt.Sprintf("name=%s&listen=%s", s.GetName(), s.GetListen()), // worker元数据补充
	)
}

// HandleConnClose 处理连接关闭
func (s *Service) HandleConnClose(conn net.Conn) {
	s.tagRWMutex.Lock()
	defer s.tagRWMutex.Unlock()
	for _address, _conn := range s.Tag2Conn {
		// 连接关闭，tag注销
		if _conn == conn {
			delete(s.Tag2Conn, _address)
			logger.Print("【%s】conn close-->rpcXClient-tag【%s】", s.GetName(), _address)
			break
		}
	}
}

/*
------------------------------ rpc方法  ------------------------------
*/

// DoApi 接口处理
func (s *Service) DoApi(ctx context.Context, request *pb.ApiRequest, res *rpcx.Response) error {
	err := s.ApiHandle.DoApi(ctx, request, res)
	return err
}

// ClientRegisterTag client注册别名
func (s *Service) ClientRegisterTag(ctx context.Context, args *param.ClientArgs, res *param.ClientRes) error {
	s.tagRWMutex.Lock()
	defer s.tagRWMutex.Unlock()
	_, ok := s.Tag2Conn[args.Tag]
	// 已注册
	if ok {
		//logger.Print("【%s】conn register-->rpcXClient-tag【%s】exists!", s.GetName(), args.Tag)
		return nil
	}

	s.Tag2Conn[args.Tag] = ctx.Value(rpcXServer.RemoteConnContextKey).(net.Conn)
	res.S = 1

	logger.Print("【%s】conn register-->rpcXClient-tag【%s】success!", s.GetName(), args.Tag)
	return nil
}

// SendCrossMsg 发送跨服消息
func (s *Service) SendCrossMsg(ctx context.Context, msg any, res *param.ClientRes) error {
	s.RpcDoAll(
		"SendMsgByAll",
		nil,
		msg,
	)

	return nil
}

// Reload 热更新
func (s *Service) Reload(ctx context.Context, files []string, res *param.AnyRes) error {
	logger.Print("reload %s  files: %s", s.GetName(), files)
	config.Manage.Reload(files...)

	return nil
}

// StopWorker 服务停止
func (s *Service) StopWorker(ctx context.Context, workerListen string, res *param.AnyRes) error {
	_curListen := s.GetListen()
	// listen不匹配
	if workerListen != "" && workerListen != _curListen {
		logger.Print("no workerUrl")
		return nil
	}
	gameservice.StopService("worker", _curListen)
	return nil
}

/*
	timer和modelLog没有自己的端口
	通过worker转发到timer和modelLog实现服务停止
*/

// StopTimer 停止定时器
func (s *Service) StopTimer(ctx context.Context, args any, res *param.AnyRes) error {
	s.tagRWMutex.RLock()
	defer s.tagRWMutex.RUnlock()
	for gatewayUrl, _ := range s.Tag2Conn {
		// 排除掉非定时器
		if !strings.Contains(gatewayUrl, "timer") {
			continue
		}
		s.RpcDo(gatewayUrl, "StopTimer", nil, nil)
	}
	return nil
}

// StopModelLog 停止数据写入
func (s *Service) StopModelLog(ctx context.Context, args any, res *param.AnyRes) error {
	s.tagRWMutex.RLock()
	defer s.tagRWMutex.RUnlock()
	for gatewayUrl, _ := range s.Tag2Conn {
		// 排除掉非modelLog
		if !strings.Contains(gatewayUrl, "modelLog") {
			continue
		}
		s.RpcDo(gatewayUrl, "StopModelLog", nil, nil)
	}
	return nil
}

/*
------------------------------ rpc方法  ------------------------------ end
*/

// RandTag 随机一个tag
func (s *Service) RandTag() string {
	for tag, _ := range s.Tag2Conn {
		// 排除掉定时器和modelLog
		if strings.Contains(tag, "timer") || strings.Contains(tag, "modelLog") {
			continue
		}
		return tag
	}

	return ""
}

// GetClient 获取client
func (s *Service) GetClient(tag string) net.Conn {
	s.tagRWMutex.RLock()
	defer s.tagRWMutex.RUnlock()
	_client, ok := s.Tag2Conn[tag]
	if !ok {
		return nil
	}

	return _client
}

// RpcDo 通知rpcXClient执行x行为
func (s *Service) RpcDo(gatewayUrl, methodName string, metadata map[string]string, data any) {
	_conn := s.GetClient(gatewayUrl)
	if _conn == nil {
		logger.WorkerLog.Warn(
			fmt.Sprintf(
				"gatewayUrl not exist!!\n gatewayUrl: %s methodName: %s metadata: %s data: %v",
				gatewayUrl, methodName, metadata, data,
			),
		)
		return
	}
	s.RpcXServerListener.RpcServer.SendMessage(
		_conn,
		"api",
		methodName,
		metadata,
		lib.Dumps(data),
	)
}

// RpcDoAll 通知所有rpcXClient执行x行为
func (s *Service) RpcDoAll(methodName string, metadata map[string]string, data any) {
	s.tagRWMutex.RLock()
	defer s.tagRWMutex.RUnlock()
	for gatewayUrl, _ := range s.Tag2Conn {
		// 排除掉定时器和modelLog
		if strings.Contains(gatewayUrl, "timer") || strings.Contains(gatewayUrl, "modelLog") {
			continue
		}
		s.RpcDo(gatewayUrl, methodName, metadata, data)
	}
}
